Reasoning and Descision Making under Uncertainty
¶

Portfolio Exam 2
¶

Sensor Fusion - Kalman and Particle Filter
¶

Task P2.2¶

Particle Filter¶

Realize an implementation of the Particle Filter in a programming language of your choice for a simulation of the ball-throwing example from the lecture slides. The task of your Particle Filter is to estimate the positions and velocity vectors of two balls flying simultaneously only from the observed erroneous positions over time.¶

ˆ Your implementation shall be flexible in the sense that it can handle variations similar to the ones from Task P2.1. In addition, you need to consider how to deal with more than one ball flying at the same time. How do you estimate two positions from the density? How do you define your state?

ˆ Note: There is more than one good approach here. Thus, there is no clear best solution approach in this task.

Trajectories of a multiple balls with the Launch Paramaters¶

In [ ]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go

# Function to simulate and return the projectile motion data
def projectile_motion(Initial_X, Initial_Y, Initial_Speed, Launch_Angle, num=50):
    g = 9.81  # gravity in m/s^2
    
    # Convert the launch angle to radians
    Launch_angle_rad = np.deg2rad(Launch_Angle)  # launch angle in radians

    # Calculate the total time of flight
    time_flight = (2 * Initial_Speed * np.sin(Launch_angle_rad)) / g

    # Generating time points
    delta_t = np.linspace(0, time_flight, num)  # time points 

    # Calculating the x and y coordinates
    x_real = Initial_X + Initial_Speed * np.cos(Launch_angle_rad) * delta_t + np.random.normal(0, 0.1, num)
    y_real = Initial_Y + Initial_Speed * np.sin(Launch_angle_rad) * delta_t - 0.5 * g * delta_t**2 + np.random.normal(0, 0.1, num)

    # Calculating the horizontal and vertical components of velocity
    v_x = Initial_Speed * np.cos(Launch_angle_rad)  # same (No change in Vx)
    v_y = Initial_Speed * np.sin(Launch_angle_rad) - g * delta_t  # Becomes zero after reaching peak

    # Calculating the resultant velocity
    velocity = np.sqrt(v_x**2 + v_y**2)  # Resultant magnitude of velocity

    return x_real, y_real, velocity

# Defining the launch variables for multiple balls
initial_conditions = [
    {"Initial_X": 0, "Initial_Y": 0, "Initial_Speed": 60.0, "Launch_Angle": 30.0},
    {"Initial_X": 10, "Initial_Y": 5, "Initial_Speed": 70.0, "Launch_Angle": 45.0},
    {"Initial_X": 20, "Initial_Y": 10, "Initial_Speed": 80.0, "Launch_Angle": 60.0},
]
colors = ['b', 'g', 'r']  # Colors for different trajectories

plt.figure(figsize=(12, 12))

# Subplot for Y vs X (trajectory)
plt.subplot(2, 1, 1)
for conditions, color in zip(initial_conditions, colors):
    x_real, y_real, velocity = projectile_motion(**conditions)
    
    label = f'Position curve {conditions["Initial_Speed"]} m/s, {conditions["Launch_Angle"]}°, X={conditions["Initial_X"]}, Y={conditions["Initial_Y"]}'
    plt.plot(x_real, y_real, 'o:' + color, label=label)  # Projectile path

plt.title('Projectile motion of multiple balls ')
plt.xlabel('Horizontal Distance (X) in meters')
plt.ylabel('Height (Y) in meters')
plt.grid(True)
plt.ylim(bottom=0)
plt.legend()

# Subplot for velocity vs distance
plt.subplot(2, 1, 2)
for conditions, color in zip(initial_conditions, colors):
    x_real, y_real, velocity = projectile_motion(**conditions)
    
    label = f'Velocity curve {conditions["Initial_Speed"]} m/s, {conditions["Launch_Angle"]}°, X={conditions["Initial_X"]}, Y={conditions["Initial_Y"]}'
    plt.plot(x_real, velocity, color, label=label)  # Velocity vs distance

plt.title('Velocity vs Distance for multiple balls')
plt.xlabel('Horizontal Distance (X) in meters')
plt.ylabel('Velocity (m/s)')
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()
No description has been provided for this image

Implementation of Particle Filter¶

In [ ]:
#Initialization Function
# This function initializes the particles for the projectile motion simulation.
# Particles are initialized with random Gaussian values around the initial conditions.
# The initial conditions are the mean of the Gaussian and the standard deviation is the noise.
def initialize_projectiles(N, initial_x, initial_y, initial_speed, launch_angle, initial_noise):
    particles = np.empty((N, 4))
    particles[:, 0] = initial_x + np.random.normal(0, initial_noise, size=N)  # x
    particles[:, 1] = initial_y + np.random.normal(0, initial_noise, size=N)  # y
    particles[:, 2] = initial_speed * np.cos(np.deg2rad(launch_angle)) 
    particles[:, 3] = initial_speed * np.sin(np.deg2rad(launch_angle)) 
    return particles

# Prediction Function
# This function updates the particles' positions and velocities according to the projectile motion model.
# Noise is added to the x and y positions to account for uncertainty in the model.
def predict_projectile_motion(particles, delta_t, g, x_noise, y_noise, N): 
    particles[:, 0] += particles[:, 2] * delta_t + np.random.normal(0, x_noise, size=N)
    particles[:, 1] += particles[:, 3] * delta_t - 0.5 * g * delta_t**2 + np.random.normal(0, y_noise, size=N)
    particles[:, 2] += 0    
    particles[:, 3] += -g * delta_t     
    return particles

# Weight Update Function
# This function updates the weights of each particle based on the difference between the predicted and measured positions.
# The weights indicate how well each particle matches the measurement.
def update_particle_weights(particles, weights, measured_position, noise_std_dev, N):
    for i in range(N):
        distance = np.linalg.norm(particles[i, :2] - measured_position)
        weights[i] = (1/np.sqrt(2*np.pi*noise_std_dev**2)) * np.exp(-(distance**2)/(2*noise_std_dev**2))
    weights += 1.e-200
    weights /= sum(weights)
    return weights

# Resampling Function
# This function resamples the particles based on their weights to keep those that best match the measurements.
# using cumulative sum and np.searchsorted
def resample_particles_cumsum(particles, weights):
    particles_resampled = np.zeros_like(particles)
    N = len(particles)
    cumulative_sum = np.cumsum(weights)
    cumulative_sum[-1] = 1.
    indexes = np.searchsorted(cumulative_sum, np.random.random(N))
    particles_resampled[:] = particles[indexes]
    weights.fill(1.0/N)
    return particles_resampled


# Estimation Function
# This function computes the mean position of the particles weighted by their weights.
def estimate_position(particles, weights):
    pos = particles[:, :2]
    mean = np.average(pos, weights=weights, axis=0)
    return mean

# Effective Sample Size Function
# This function calculates the effective sample size to determine when resampling is necessary.
def effective_sample_size(weights): 
    return 1. / np.sum(np.square(weights))

# Simulate measurements of the projectile motion with noise
def simulate_projectile_measurements(t, initial_x, initial_y, initial_speed, launch_angle, g, noise_std_dev):
    theta_rad = np.radians(launch_angle)
    x_real = initial_x + initial_speed * np.cos(theta_rad) * t
    y_real = initial_y + initial_speed * np.sin(theta_rad) * t - 0.5 * g * t**2
    noise = np.random.normal(scale=noise_std_dev, size=x_real.shape)
    x_real_noisy = x_real + noise
    y_real_noisy = y_real + noise
    return x_real_noisy, y_real_noisy
In [ ]:
# Run the particle filter for 2 projectiles
N = 100

# Define the initial conditions for the first projectile
initial_x1, initial_y1, initial_speed1, launch_angle_1 = 0, 0, 8, 40

# Define the initial conditions for the second projectile
initial_x2, initial_y2, initial_speed2, launch_angle_2 = 0, 0, 10, 60

# Acceleration due to gravity
g = 9.81

# Measurement noise for both projectiles
measurement_noise1 = 0.1
measurement_noise2 = 0.2

# Process noise for x and y positions
process_noise_x = 0.1
process_noise_y = 0.3

# Initial conditions noise for both projectiles
initial_noise1 = 0.2
initial_noise2 = 0.2

# Time step for simulation
delta_t = 0.1

# Define the failure period start and duration (in time steps)
dropout_start = 15
dropout_duration = 20

# Calculate the total time of flight for the second projectile
time_flight = (3 * initial_speed2 * np.sin(np.deg2rad(launch_angle_2))) / g

# Generate time points for the simulation
t = np.arange(0.0, time_flight, delta_t)

# Generate noisy measurements for the first projectile
x_observed_1, y_observed_1 = simulate_projectile_measurements(t, initial_x1, initial_y1, initial_speed1, launch_angle_1, g, measurement_noise1)

# Generate noisy measurements for the second projectile
x_observed_2, y_observed_2 = simulate_projectile_measurements(t, initial_x2, initial_y2, initial_speed2, launch_angle_2, g, measurement_noise2)

# Initialize particles for the first projectile
particles_1 = initialize_projectiles(N, initial_x1, initial_y1, initial_speed1, launch_angle_1, initial_noise1)

# Initialize particles for the second projectile
particles_2 = initialize_projectiles(N, initial_x2, initial_y2, initial_speed2, launch_angle_2, initial_noise2)

# Initialize uniform weights for both projectiles
weights_1 = np.ones(N) / N
weights_2 = np.ones(N) / N

# Lists to store estimates and particles at each time step
estimates_1 = []
estimates_2 = []
particles_1_list = []
particles_2_list = []

# Looping through each time step to update particles and weights
for i in range(len(x_observed_1)):
    # Getting measurements for both projectiles at this time step
    z1 = np.array([x_observed_1[i], y_observed_1[i]])
    z2 = np.array([x_observed_2[i], y_observed_2[i]])
    
    # Predicting the next state of the particles for both projectiles
    particles_1 = predict_projectile_motion(particles_1, delta_t, g, process_noise_x, process_noise_y, N)
    particles_2 = predict_projectile_motion(particles_2, delta_t, g, process_noise_x, process_noise_y, N)
    
    # Storing the particles' state for plotting later
    particles_1_list.append(particles_1.copy())
    particles_2_list.append(particles_2.copy())

    # Updating weights only if not within the failure period
    if not (dropout_start <= i < dropout_start + dropout_duration):
        weights_1 = update_particle_weights(particles_1, weights_1, z1, measurement_noise1, N)
        weights_2 = update_particle_weights(particles_2, weights_2, z2, measurement_noise2, N)
    
    # Resample particles if the effective sample size falls below a threshold
    if effective_sample_size(weights_1) < N / 0.5:
        particles_1 = resample_particles_cumsum(particles_1, weights_1)
    if effective_sample_size(weights_2) < N / 0.5:
        particles_2 = resample_particles_cumsum(particles_2, weights_2)
     
    # Estimating the position based on the current particle states and weights
    estimate_1 = estimate_position(particles_1, weights_1)
    estimate_2 = estimate_position(particles_2, weights_2)

    # Storing the estimates
    estimates_1.append(estimate_1)
    estimates_2.append(estimate_2)

# Converting estimates to numpy arrays 
estimates_1 = np.array(estimates_1)
estimates_2 = np.array(estimates_2)

# Calculating the real x and y coordinates for both projectiles without noise
x_real_1 = initial_x1 + initial_speed1 * np.cos(np.deg2rad(launch_angle_1)) * t 
y_real_1 = initial_y1 + initial_speed1 * np.sin(np.deg2rad(launch_angle_1)) * t - 0.5 * g * t**2
x_real_2 = initial_x2 + initial_speed2 * np.cos(np.deg2rad(launch_angle_2)) * t 
y_real_2 = initial_y2 + initial_speed2 * np.sin(np.deg2rad(launch_angle_2)) * t - 0.5 * g * t**2

# Calculating the real velocities for both projectiles
v_real_1 = np.sqrt((initial_speed1 * np.cos(np.deg2rad(launch_angle_1)))**2 + (initial_speed1 * np.sin(np.deg2rad(launch_angle_1)) - g * t)**2)
v_real_2 = np.sqrt((initial_speed2 * np.cos(np.deg2rad(launch_angle_2)))**2 + (initial_speed2 * np.sin(np.deg2rad(launch_angle_2)) - g * t)**2)

# Calculating the estimated velocities for both projectiles
v_estimated_1 = np.sqrt(np.diff(estimates_1[:, 0], prepend=0)**2 + np.diff(estimates_1[:, 1], prepend=0)**2) / delta_t
v_estimated_2 = np.sqrt(np.diff(estimates_2[:, 0], prepend=0)**2 + np.diff(estimates_2[:, 1], prepend=0)**2) / delta_t

# Calculating the distances for both projectiles
d_real_1 = np.sqrt(x_real_1**2 + y_real_1**2)
d_real_2 = np.sqrt(x_real_2**2 + y_real_2**2)
d_estimated_1 = np.sqrt(estimates_1[:, 0]**2 + estimates_1[:, 1]**2)
d_estimated_2 = np.sqrt(estimates_2[:, 0]**2 + estimates_2[:, 1]**2)

# Setting the maximum for the x-axis and y axis
max_distance = max(x_real_1.max(), x_real_2.max())
max_height = max(y_real_1.max(), y_real_2.max())

# Ploting the trajectory of the 2 projectiles along with their estimated positions
fig = go.Figure()

# Real positions with full lines
fig.add_trace(go.Scatter(x=x_real_1, y=y_real_1, mode='lines', name='Real position of Ball 1', line=dict(color='red')))
fig.add_trace(go.Scatter(x=x_real_2, y=y_real_2, mode='lines', name='Real position of Ball 2', line=dict(color='green')))

# Observed noisy positions with dashed lines
fig.add_trace(go.Scatter(x=x_observed_1, y=y_observed_1, mode='lines', name='Observed path of projectile 1 (Noisy)', line=dict(color='green', dash='dash')))
fig.add_trace(go.Scatter(x=x_observed_2, y=y_observed_2, mode='lines', name='Observed path of projectile 2 (Noisy)', line=dict(color='blue', dash='dash')))

# Estimated positions with scatter
fig.add_trace(go.Scatter(x=estimates_1[:, 0], y=estimates_1[:, 1], mode='markers', name='Estimated position of Ball 1', marker=dict(symbol='circle', size=7, color='blue')))
fig.add_trace(go.Scatter(x=estimates_2[:, 0], y=estimates_2[:, 1], mode='markers', name='Estimated position of Ball 2', marker=dict(symbol='circle', size=7, color='red')))

# Highlight the dropout period
dropout_start_time = dropout_start * delta_t
dropout_end_time = (dropout_start + dropout_duration) * delta_t
fig.add_vrect(x0=dropout_start_time, x1=dropout_end_time, fillcolor='blue', opacity=0.3, layer='below', line_width=0, annotation_text='Dropout period', annotation_position='top left')

# Updating plot layout
fig.update_layout(
    title='True and Estimated Positions of 2 balls',
    xaxis_title='Distance (m)',
    yaxis_title='Height (m)',
    xaxis=dict(range=[0, max_distance]),
    yaxis=dict(range=[0, max(y_real_2) * 1.1]),
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
)

# Show plot
fig.show()

# Ploting the estimated and real velocities versus distance for the first projectile
fig1 = go.Figure()

fig1.add_trace(go.Scatter(x=d_real_1, y=v_real_1, mode='lines', name='Real Velocity 1', line=dict(color='blue')))
fig1.add_trace(go.Scatter(x=d_estimated_1, y=v_estimated_1, mode='lines', name='Estimated Velocity 1', line=dict(color='red', dash='dash')))

fig1.update_layout(
    title='Velocity vs Distance for Ball 1',
    xaxis_title='Distance (m)',
    yaxis_title='Velocity (m/s)',
    xaxis=dict(range=[0, max_distance]),
    yaxis=dict(range=[0, max(v_real_1.max(), v_estimated_1.max()) * 1.1]),
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
)

# Show plot
fig1.show()

# Ploting the estimated and real velocities versus distance for the second projectile
fig2 = go.Figure()

fig2.add_trace(go.Scatter(x=d_real_2, y=v_real_2, mode='lines', name='Real Velocity 2', line=dict(color='red')))
fig2.add_trace(go.Scatter(x=d_estimated_2, y=v_estimated_2, mode='lines', name='Estimated Velocity 2', line=dict(color='blue', dash='dash')))

fig2.update_layout(
    title='Velocity vs Distance for Ball 2',
    xaxis_title='Distance (m)',
    yaxis_title='Velocity (m/s)',
    xaxis=dict(range=[0, max_distance]),
    yaxis=dict(range=[0, max(v_real_2.max(), v_estimated_2.max()) * 1.1]),
    legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
)

# Show plot
fig2.show()
same plots using matplotlib¶
In [ ]:
# Plot trajectories
fig, ax = plt.subplots(figsize=(14, 7))

# Real positions
ax.plot(x_real_1, y_real_1, label='Real position of Ball 1', color='red')
ax.plot(x_real_2, y_real_2, label='Real position of Ball 2', color='green')

# Observed noisy positions
ax.plot(x_observed_1, y_observed_1, label='Observed path of projectile 1 (Noisy)', linestyle='--', color='green')
ax.plot(x_observed_2, y_observed_2, label='Observed path of projectile 2 (Noisy)', linestyle='--', color='blue')

# Estimated positions
ax.scatter(estimates_1[:, 0], estimates_1[:, 1], label='Estimated position of Ball 1', color='blue')
ax.scatter(estimates_2[:, 0], estimates_2[:, 1], label='Estimated position of Ball 2', color='red')

# Highlight dropout period
dropout_start_time = dropout_start * delta_t
dropout_end_time = (dropout_start + dropout_duration) * delta_t
ax.axvspan(dropout_start_time, dropout_end_time, color='blue', alpha=0.3, label='Dropout period')

# Update plot layout
ax.set_title('True and Estimated Positions of 2 balls')
ax.set_xlabel('Distance (m)')
ax.set_ylabel('Height (m)')
ax.set_xlim([0, max_distance])
ax.set_ylim([0, max_height * 1.5])
ax.legend(loc='upper right')

# Show plot
plt.show()

# Plot estimated and real velocities for Ball 1
fig1, ax1 = plt.subplots(figsize=(14, 7))

ax1.plot(d_real_1, v_real_1, label='Real Velocity 1', color='blue')
ax1.plot(d_estimated_1, v_estimated_1, label='Estimated Velocity 1', linestyle='--', color='red')

ax1.set_title('Velocity vs Distance for Ball 1')
ax1.set_xlabel('Distance (m)')
ax1.set_ylabel('Velocity (m/s)')
ax1.set_xlim([0, max_distance])
ax.set_ylim([0, max_height * 1.1])
ax1.legend(loc='upper right')

# Show plot
plt.show()

# Plot estimated and real velocities for Ball 2
fig2, ax2 = plt.subplots(figsize=(14, 7))

ax2.plot(d_real_2, v_real_2, label='Real Velocity 2', color='red')
ax2.plot(d_estimated_2, v_estimated_2, label='Estimated Velocity 2', linestyle='--', color='blue')

ax2.set_title('Velocity vs Distance for Ball 2')
ax2.set_xlabel('Distance (m)')
ax2.set_ylabel('Velocity (m/s)')
ax2.set_xlim([0, max_distance])
ax.set_ylim([0, max_height * 1.1])
ax2.legend(loc='upper right')

# Show plot
plt.show()
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

1. True and Estimated Position:¶

Real positions: The red and green solid lines show the true trajectories of Ball 1 and Ball 2, respectively.

Noisy observations: The green and blue dashed lines represent the noisy observations of the ball's positions at various time steps for Ball 1 and Ball 2.

Estimated positions by particle filter: The blue and red dots indicate the position estimates provided by the particle filter for Ball 1 and Ball 2, respectively.

Dropout period: The shaded blue region highlights the time interval with increased measurement noise, simulating a sensor dropout.

2. Velocity vs Distance for Ball 1:¶

Real Velocity: The blue solid line represents the actual calculated velocity of Ball 1.

Estimated Velocity by particle filter: The red dashed line represents the velocity estimates provided by the particle filter for Ball 1.

3. Velocity vs Distance for Ball 2:¶

Real Velocity: The red solid line represents the actual calculated velocity of Ball 2.

Estimated Velocity by particle filter: The blue dashed line represents the velocity estimates provided by the particle filter for Ball 2.

These plots demonstrate the effectiveness of the Particle filter in estimating the true state of a system, even in the presence of noise and periods of high measurement uncertainty.

THE END
¶

Name: THARUN KUMAR KORINE PALLI

Matriculation Number: 5123708

Email: tharunkumar.korinepalli@study.thws.de